tutorials/022 - Writing Partitions Concurrently.ipynb (186 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"[](https://github.com/aws/aws-sdk-pandas)\n",
"\n",
"# 22 - Writing Partitions Concurrently\n",
"\n",
"* `concurrent_partitioning` argument:\n",
"\n",
" If True will increase the parallelism level during the partitions writing. It will decrease the\n",
" writing time and increase memory usage.\n",
"\n",
"*P.S. Check the [function API doc](https://aws-sdk-pandas.readthedocs.io/en/3.11.0/api.html) to see it has some argument that can be configured through Global configurations.*"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"%reload_ext memory_profiler\n",
"\n",
"import awswrangler as wr"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Enter your bucket name:"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" ············\n"
]
}
],
"source": [
"import getpass\n",
"\n",
"bucket = getpass.getpass()\n",
"path = f\"s3://{bucket}/data/\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Reading 4 GB of CSV from NOAA's historical data and creating a year column"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Number of rows: 125407761\n",
"Number of columns: 9\n"
]
}
],
"source": [
"noaa_path = \"s3://noaa-ghcn-pds/csv/by_year/193\"\n",
"\n",
"cols = [\"id\", \"dt\", \"element\", \"value\", \"m_flag\", \"q_flag\", \"s_flag\", \"obs_time\"]\n",
"dates = [\"dt\", \"obs_time\"]\n",
"dtype = {x: \"category\" for x in [\"element\", \"m_flag\", \"q_flag\", \"s_flag\"]}\n",
"\n",
"df = wr.s3.read_csv(noaa_path, names=cols, parse_dates=dates, dtype=dtype)\n",
"\n",
"df[\"year\"] = df[\"dt\"].dt.year\n",
"\n",
"print(f\"Number of rows: {len(df.index)}\")\n",
"print(f\"Number of columns: {len(df.columns)}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Default Writing"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"peak memory: 22169.04 MiB, increment: 11119.68 MiB\n",
"CPU times: user 49 s, sys: 12.5 s, total: 1min 1s\n",
"Wall time: 1min 11s\n"
]
}
],
"source": [
"%%time\n",
"%%memit\n",
"\n",
"wr.s3.to_parquet(\n",
" df=df,\n",
" path=path,\n",
" dataset=True,\n",
" mode=\"overwrite\",\n",
" partition_cols=[\"year\"],\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Concurrent Partitioning (Decreasing writing time, but increasing memory usage)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"peak memory: 27819.48 MiB, increment: 15743.30 MiB\n",
"CPU times: user 52.3 s, sys: 13.6 s, total: 1min 5s\n",
"Wall time: 41.6 s\n"
]
}
],
"source": [
"%%time\n",
"%%memit\n",
"\n",
"wr.s3.to_parquet(\n",
" df=df,\n",
" path=path,\n",
" dataset=True,\n",
" mode=\"overwrite\",\n",
" partition_cols=[\"year\"],\n",
" concurrent_partitioning=True # <-----\n",
")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3.9.14",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.14"
}
},
"nbformat": 4,
"nbformat_minor": 4
}